package work.javac.exec;

import work.javac.common.ConfigUtil;
import work.javac.common.FileReadWriter;
import work.javac.common.Log;
import work.javac.common.PathUtil;
import work.javac.common.database.SQLExecute;
import work.javac.common.database.utils.CopyUtil;
import work.javac.common.database.utils.DBUtils;
import work.javac.common.database.utils.SqlUtil;

import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;

public class Sync {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
    private static final String INIT = "init";
    private static final String TYPE = ".sql";

    public static void start(Map<String, String> parse) {
        CopyUtil util = new CopyUtil();
        util.setTABLENAME(parse.get("table"));
        util.setSQL(parse.get("sql"));
        util.setFILEPATH(parse.get("file"));
        if (parse.get("file") == null) {
            Log.info("数据不落地-同步模式");
            if (parse.get("output") == null) {
                Log.err("请检查参数(-o)输出数据源不能为空!");
                System.exit(1);
            }
            if (parse.get("input") == null) {
                Log.err("请检查参数(-i)输入数据源不能为空!");
                System.exit(1);
            }
            if (parse.get("sql") == null && parse.get("table") == null) {
                try {
                    t18Sync(parse);
                    Log.info("同步任务执行结束!");
                    return;
                } catch (SQLException e) {
                    Log.err(e.getMessage());
                    System.exit(1);
                }
            }
            // 如果没有给文件路径,那么说明是直接 两个数据源 表与表 直接传输
            try (Connection outConn = DBUtils.getConn(parse.get("output"));
                 Connection inConn = DBUtils.getConn(parse.get("input"))) {
                SQLExecute.executeCopy(outConn, inConn, util);
            } catch (SQLException e) {
                Log.err(e.getMessage());
                System.exit(1);
            }
            Log.info("同步任务执行结束!");
            return;
        }
        if (parse.get("output") != null) {
            Log.info("卸载数据文件模式");
            util.setTYPE("O");
            try (Connection conn = DBUtils.getConn(parse.get("output"))) {
                if (parse.get("table") != null) {
                    SQLExecute.getColumns(conn, parse.get("table")).forEach(item -> util.addCOLUMNNAME(item));
                }
                Log.info("从数据源:%s写出表:%s到文件:%s", parse.get("output"), parse.get("table"), parse.get("file"));
                SQLExecute.executeCopyFile(conn, util);
            } catch (SQLException | IOException e) {
                System.exit(1);
            }
            Log.info("从数据源:%s写出表:%s到文件:%s完成!", parse.get("output"), parse.get("table"), parse.get("file"));
        }
        if (parse.get("input") != null) {
            Log.info("从数据文件加载表模式");
            util.setTYPE("I");
            try (Connection conn = DBUtils.getConn(parse.get("input"))) {
                Log.info("从数据文件:%s写入数据源:%s的表:%s", parse.get("file"), parse.get("input"), parse.get("table"));
                SQLExecute.executeCopyFile(conn, util);
            } catch (SQLException | IOException e) {
                System.exit(1);
            }
            Log.info("从数据文件:%s写入数据源:%s的表:%s完成!", parse.get("file"), parse.get("input"), parse.get("table"));
        }
        Log.info("同步任务执行结束!");
    }

    private static void t18Sync(Map<String, String> map) throws SQLException {
        try (Connection conn = DBUtils.getConn(map.get("input"))) {
            SqlUtil util = new SqlUtil();
            util.setSCHEMA(ConfigUtil.get(map.get("input") + ".schema"));
            util.setColumns("SYNC_SQL, PRE_SQL, LANDING_TABLENAME, LANDING_SCHEMA");
            util.setTABLE_NAME("T18_SYNC");
            valid(conn, util);
            util.setWhere(" WHERE STATUS = '1' ");
            List<Map<String, Object>> list = SQLExecute.executeListMap(conn, util);
            Log.info("需要执行同步的作业数为:%s条", list.size());
            ExecutorService service = Executors.newWorkStealingPool(Integer.parseInt(ConfigUtil.get("sync.pool")));
            LinkedBlockingDeque<Future> futures = new LinkedBlockingDeque<>();
            String beginSql = "UPDATE T18_SYNC SET STATUS = ?, BEGIN_DATE = SYSDATE, ERROR_MSG = ?, SYNC_DATA_SIZE = ? WHERE SYNC_SQL = ? ";
            String endSql = "UPDATE T18_SYNC SET STATUS = ?, END_DATE = SYSDATE, ERROR_MSG = ?, SYNC_DATA_SIZE = ? WHERE SYNC_SQL = ? ";
            list.forEach(item -> {
                Future submit = service.submit(() -> {
                    StringBuffer sql = new StringBuffer();
                    Object[] data = new Object[4];
                    CopyUtil copy = new CopyUtil();
                    String syncSql = (String) item.get("SYNC_SQL");
                    String preSql = (String) item.get("PRE_SQL");
                    String landingTablename = (String) item.get("LANDING_TABLENAME");
                    String landingSchema = (String) item.get("LANDING_SCHEMA");
                    int from = syncSql.toUpperCase().indexOf("FROM");
                    if (from == -1) {
                        from = 0;
                        sql.append("SELECT * FROM ");
                    } else {
                        from += 4;
                    }
                    sql.append(syncSql);
                    int tableEnd = syncSql.toUpperCase().indexOf("PARTITION", from);
                    if (tableEnd == -1) {
                        tableEnd = syncSql.toUpperCase().indexOf("WHERE", from);
                    }
                    if (tableEnd == -1) {
                        tableEnd = syncSql.length();
                    }
                    String tableName = landingTablename == null ? syncSql.substring(from, tableEnd).trim() : landingTablename;
                    copy.setSQL(sql.toString());
                    copy.setSCHEMA(landingSchema);
                    copy.setTABLENAME(tableName);
                    try (Connection fromConn = DBUtils.getConn(map.get("output"));
                         Connection toConn = DBUtils.getConn(map.get("input"))) {
                        data[0] = "2";
                        data[1] = null;
                        data[2] = null;
                        data[3] = syncSql;
                        // 更新 T18_SYNC 2
                        SQLExecute.executeSQL(conn, beginSql, data);
                        if (preSql != null) {
                            // 执行前置操作语句
                            SQLExecute.executeSQL(conn, preSql);
                        }
                        // 执行同步
                        long size = SQLExecute.executeCopy(fromConn, toConn, copy);
                        data[0] = "3";
                        data[2] = size;
                        // 更新 T18_SYNC 3
                        SQLExecute.executeSQL(conn, endSql, data);
                    } catch (SQLException e) {
                        try {
                            data[0] = "0";
                            data[1] = e.getMessage();
                            //  更新 T18_SYNC 0
                            SQLExecute.executeSQL(conn, endSql, data);
                        } catch (SQLException err) {
                        }
                    }
                });
                futures.add(submit);
            });
            service.shutdown();
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    Log.err(e.getMessage());
                }
            });
        }
    }

    private static void valid(Connection conn, SqlUtil util) {
        util.setWhere(" LIMIT 0 ");
        try {
            SQLExecute.executeListMap(conn, util);
        } catch (SQLException e) {
            Log.err("写入数据源不存在T18_SYNC同步表！");
            init(conn);
        }
    }

    private static void init(Connection conn) {
        try {
            LocalDateTime localDateTime = LocalDateTime.now();
            File initFile = PathUtil.getFile(INIT + localDateTime.format(formatter) + TYPE);
            String schema = SQLExecute.getSchema(conn);
            String read = FileReadWriter.read(PathUtil.getResourceAsStream(INIT + TYPE));
            FileReadWriter.writer(initFile, read.replace("{schema}", schema));
            throw new RuntimeException(String.format("T18_SYNC初始化脚本生成完毕!路径:%s;请手动确认后在数据库执行!", initFile.getPath()));
        } catch (IOException | SQLException e) {
            Log.err(e.getMessage());
        }
    }

}
